<?php

/**
 * Redis实现延迟消息队列
 * Class Queue
 */
class Queue
{
    /**
     * 当前实例
     * @var Queue
     */
    private static $instance;

    /**
     * Redis实例
     * @var Redis
     */
    private $redis;

    /**
     * 单例入口
     * @param mixed ...$vars
     * @return Queue
     */
    public static function instance(...$vars): Queue
    {
        if (!self::$instance) {
            self::$instance = new self(...$vars);
        }

        return self::$instance;
    }

    private function __clone()
    {
        // 单例，私有__clone方法
    }

    /**
     * Queue constructor.
     * @param $host
     * @param int $port
     * @param string $password
     */
    private function __construct($host, $port = 6379, $password = '')
    {
        $redis = new Redis();
        $redis->connect($host, $port);
        $redis->auth($password);
        $this->redis = $redis;
    }

    /**
     * 添加任务到队列
     * @param $key
     * @param $value
     * @param int $sec
     * @return bool
     */
    public function addDelayTask($key, $value, $sec = 10): bool
    {
        // 获取当前时间戳
        $expireTime = $this->getTime($sec);
        // 添加到有序集合
        return $this->redis->zAdd($key, $expireTime, $value) > 0;
    }

    /**
     * 删除延迟任务
     * @param $key
     * @param $value
     * @return bool
     */
    public function remDelayTask($key, $value): bool
    {
        // 从有序集合删除指定值
        return $this->redis->zRem($key, $value) > 0;
    }

    /**
     * 处理延迟任务
     * @param $key
     * @param $callback
     * @param int $sleep
     */
    public function handleDelayTask($key, $callback, $sleep = 500000)
    {
        // 死循环
        while (true) {
            // 尝试取出有序集合中第一个值
            $task = $this->redis->zRange($key, 0, 0, true);
            if (!empty($task)) {
                $value      = array_keys($task)[0];
                $expireTime = $task[$value];
                // 如果当前时间已经到达设定到执行时间
                // 则开始执行逻辑，并删除当前任务
                if ($this->getTime() >= $expireTime) {
                    call_user_func($callback, $value);
                    $this->remDelayTask($key, $value);
                    // 直接进入下一次循环，跳过usleep
                    continue;
                }
            }

            // 减少读取次数
            usleep($sleep);
        }
    }

    /**
     * 添加实时任务
     * @param $key
     * @param $value
     * @return false|int
     */
    public function addTask($key, $value)
    {
        // 添加到列表
        return $this->redis->lPush($key, $value);
    }

    /**
     * 删除实时任务
     * @param $key
     * @param $value
     * @return bool|int
     */
    public function remTask($key, $value)
    {
        // 从列表删除指定值
        return $this->redis->lRem($key, $value, 0);
    }

    /**
     * 处理实时队列
     * @param $key
     * @param $callback
     */
    public function handleTask($key, $callback)
    {
        // 死循环
        while (true) {
            // 阻塞读取列表中的值
            $task = $this->redis->brPop($key, 0);
            // 执行用户逻辑
            call_user_func($callback, $task[1]);
        }
    }

    /**
     * 获取现在时间戳（微秒）
     * @param int $sec
     * @return int
     */
    private function getTime($sec = 0): int
    {
        return time() + $sec;
    }

}
